package com.heima.kafka.quickstart;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/** 消费者 */
public class ConsumerQuickStart {

  public static void main(String[] args) {
    // 1.添加kafka的配置信息
    Properties properties = new Properties();
    // kafka的连接地址
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
    // 消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
    // 消息的反序列化器
    properties.put(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");

    // 2.消费者对象
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

    // 3.订阅主题
    consumer.subscribe(Collections.singletonList("itheima-topic"));

    System.out.println("订阅成功");

    // 当前线程一直处于监听状态
    while (true) {
      // 4.获取消息
      ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
        System.out.println(consumerRecord.key());
        System.out.println(consumerRecord.value());
      }
    }
  }
}
